///|
pub struct FutureVTable[T] {
  new : () -> UInt64
  read : (Int, Int) -> Int
  write : (Int, Int) -> Int
  cancel_read : (Int) -> Int
  cancel_write : (Int) -> Int
  drop_readable : (Int) -> Unit
  drop_writable : (Int) -> Unit
  malloc : (Int) -> Int
  free : (Int) -> Unit
  lift : (Int) -> T
  lower : (T, Int) -> Unit
}

///|
pub fn[T] FutureVTable::new(
  new : () -> UInt64,
  read : (Int, Int) -> Int,
  write : (Int, Int) -> Int,
  cancel_read : (Int) -> Int,
  cancel_write : (Int) -> Int,
  drop_readable : (Int) -> Unit,
  drop_writable : (Int) -> Unit,
  malloc : (Int) -> Int,
  free : (Int) -> Unit,
  lift : (Int) -> T,
  lower : (T, Int) -> Unit,
) -> FutureVTable[T] {
  {
    new,
    read,
    write,
    cancel_read,
    cancel_write,
    drop_readable,
    drop_writable,
    malloc,
    free,
    lift,
    lower,
  }
}

///|
pub fn[T] new_future(
  vtable : FutureVTable[T],
) -> (FutureReader[T], FutureWriter[T]) {
  let handle = (vtable.new)()
  let left_handle = handle.to_int()
  let right_handle = (handle >> 32).to_int()
  (
    FutureReader::new(left_handle, vtable),
    FutureWriter::new(right_handle, vtable),
  )
}

///|
pub struct FutureReader[T] {
  handle : Int
  vtable : FutureVTable[T]
  mut code : Int?
  mut dropped : Bool 
  memory_refs : Array[Int]
}

///|
pub fn[T] FutureReader::new(
  handle : Int,
  vtable : FutureVTable[T],
) -> FutureReader[T] {
  { handle, vtable, code: None, memory_refs: [], dropped: false }
}

///|
pub impl[T] Waitable for FutureReader[T] with update(self, code~ : Int) -> Unit {
  self.code = Some(code)
}

pub impl[T] Eq for FutureReader[T] with equal(self, other) -> Bool {
  self.handle == other.handle
}

///|
pub impl[T] Waitable for FutureReader[T] with handle(self) -> Int {
  self.handle
}

///|
pub impl[T] Waitable for FutureReader[T] with cancel(self) -> Unit {
  if self.code is Some(code) && WaitableStatus::decode(code) is Cancelled(_) {
    return
  }
  self.code = Some((self.vtable.cancel_read)(self.handle))
}

///|
pub impl[T] Waitable for FutureReader[T] with drop(self) -> Bool {
  _async_debug("stream-reader-drop(\{self.handle})")
  if self.dropped {
    return false
  }
  (self.vtable.drop_readable)(self.handle)
  self.dropped = true
  for ptr in self.memory_refs {
    self.free(ptr)
  }
  true
}

///|
pub impl[T] Waitable for FutureReader[T] with done(self) -> Bool {
  match self.code {
    Some(c) =>
      match WaitableStatus::decode(c) {
        Completed(_) | Dropped(_) | Cancelled(_) => true
        Blocking => false
      }
    None => false
  }
}

///|
pub fn[T] FutureReader::malloc(self : FutureReader[T]) -> Int {
  let ptr = (self.vtable.malloc)(1)
  ptr
}

///|
pub fn[T] FutureReader::free(self : FutureReader[T], ptr : Int) -> Unit {
  (self.vtable.free)(ptr)
}

///|
pub fn[T] FutureReader::lift(self : FutureReader[T], ptr : Int) -> T {
  let res = (self.vtable.lift)(ptr)
  res
}

///|
pub fn[T] FutureReader::lower_read(self : FutureReader[T], ptr : Int) -> Int {
  (self.vtable.read)(self.handle, ptr)
}

///|
pub async fn[T] FutureReader::read(self : FutureReader[T]) -> T raise {
  let buf_ptr = self.malloc()
  self.memory_refs.push(buf_ptr)
  self.code = Some(self.lower_read(buf_ptr))
  _async_debug("future-read(\{self.handle}) -> \{self.code.unwrap()}")
  // register this waitable to the current task
  let task = current_task()
  task.add_waitable(self, current_coroutine())
  defer task.remove_waitable(self)

  // wait until ready
  for {
    let status = WaitableStatus::decode(self.code.unwrap())
    match status {
      Cancelled(_) | Dropped(_) => raise Cancelled::Cancelled
      Completed(_) => break
      Blocking => suspend()
    }
  }
  // when receive event, continue this coroutine
  let value = self.lift(buf_ptr)
  return value
}

///|
pub struct FutureWriter[T] {
  handle : Int
  vtable : FutureVTable[T]
  mut code : Int?
  mut dropped : Bool
  memory_refs : Array[Int]
}

///|
pub fn[T] FutureWriter::new(
  handle : Int,
  vtable : FutureVTable[T],
) -> FutureWriter[T] {
  { handle, vtable, code: None, memory_refs: [], dropped: false }
}

///|
pub impl[T] Waitable for FutureWriter[T] with update(self, code~ : Int) -> Unit {
  self.code = Some(code)
}

pub impl[T] Eq for FutureWriter[T] with equal(self, other) -> Bool {
  self.handle == other.handle
}

///|
pub impl[T] Waitable for FutureWriter[T] with handle(self) -> Int {
  self.handle
}

///|
pub impl[T] Waitable for FutureWriter[T] with cancel(self) -> Unit {
  if self.code is Some(code) && WaitableStatus::decode(code) is Cancelled(_) {
    return
  }
  self.code = Some((self.vtable.cancel_write)(self.handle))
}

///|
pub impl[T] Waitable for FutureWriter[T] with drop(self) -> Bool {
  _async_debug("stream-writer-drop(\{self.handle})")
  if self.dropped {
    return false
  }
  (self.vtable.drop_writable)(self.handle)
  self.dropped = true
  for ptr in self.memory_refs {
    self.free(ptr)
  }
  true
}

///|
pub impl[T] Waitable for FutureWriter[T] with done(self) -> Bool {
  match self.code {
    Some(c) =>
      match WaitableStatus::decode(c) {
        Completed(_) | Dropped(_) | Cancelled(_) => true
        Blocking => false
      }
    None => false
  }
}

///|
pub fn[T] FutureWriter::malloc(self : FutureWriter[T]) -> Int {
  (self.vtable.malloc)(1)
}

///|
pub fn[T] FutureWriter::free(self : FutureWriter[T], ptr : Int) -> Unit {
  (self.vtable.free)(ptr)
}

///|
pub fn[T] FutureWriter::lower(
  self : FutureWriter[T],
  value : T,
  ptr : Int,
) -> Unit {
  (self.vtable.lower)(value, ptr)
}

///|
pub fn[T] FutureWriter::lower_write(self : FutureWriter[T], ptr : Int) -> Int {
  (self.vtable.write)(self.handle, ptr)
}

///|
pub async fn[T] FutureWriter::write(
  self : FutureWriter[T],
  value : T,
) -> Unit raise {
  // register this waitable to the current task
  let task = current_task()
  task.add_waitable(self, current_coroutine())
  defer task.remove_waitable(self)

  let buf_ptr = self.malloc()
  self.memory_refs.push(buf_ptr)
  self.lower(value, buf_ptr)
  self.code = Some(self.lower_write(buf_ptr))
  defer self.free(buf_ptr)

  // wait until ready
  for {
    let status = WaitableStatus::decode(self.code.unwrap())
    match status {
      Cancelled(_) | Dropped(_) => raise Cancelled::Cancelled
      Completed(_) => break
      Blocking => suspend()
    }
  }
  // when receive event, continue this coroutine
  return
}

///|
pub suberror StreamCancelled (Int, Cancelled) derive(Show)

///|
pub struct StreamVTable[T] {
  new : () -> UInt64
  read : (Int, Int, Int) -> Int
  write : (Int, Int, Int) -> Int
  cancel_read : (Int) -> Int
  cancel_write : (Int) -> Int
  drop_readable : (Int) -> Unit
  drop_writable : (Int) -> Unit
  malloc : (Int) -> Int
  free : (Int) -> Unit
  lift : (Int, Int) -> FixedArray[T]
  lower : (FixedArray[T]) -> Int
}

///|
pub fn[T] StreamVTable::new(
  new : () -> UInt64,
  read : (Int, Int, Int) -> Int,
  write : (Int, Int, Int) -> Int,
  cancel_read : (Int) -> Int,
  cancel_write : (Int) -> Int,
  drop_readable : (Int) -> Unit,
  drop_writable : (Int) -> Unit,
  malloc : (Int) -> Int,
  free : (Int) -> Unit,
  lift : (Int, Int) -> FixedArray[T],
  lower : (FixedArray[T]) -> Int,
) -> StreamVTable[T] {
  {
    new,
    read,
    write,
    cancel_read,
    cancel_write,
    drop_readable,
    drop_writable,
    malloc,
    free,
    lift,
    lower,
  }
}

pub fn[T] new_stream(
  vtable : StreamVTable[T],
) -> (StreamReader[T], StreamWriter[T]) {
  let handle = (vtable.new)()
  let left_handle = handle.to_int()
  let right_handle = (handle >> 32).to_int()
  (
    StreamReader::new(left_handle, vtable),
    StreamWriter::new(right_handle, vtable),
  )
}


///|
pub struct StreamReader[T] {
  handle : Int
  vtable : StreamVTable[T]
  mut code : Int?
  mut dropped : Bool
  memory_refs : Array[Int]
}

///|
pub impl[T] Waitable for StreamReader[T] with update(self, code~ : Int) -> Unit {
  self.code = Some(code)
}

pub impl[T] Eq for StreamReader[T] with equal(self, other) -> Bool {
  self.handle == other.handle
}


///|
pub impl[T] Waitable for StreamReader[T] with handle(self) -> Int {
  self.handle
}

///|
pub impl[T] Waitable for StreamReader[T] with cancel(self) -> Unit {
  if self.code is Some(code) && WaitableStatus::decode(code) is Cancelled(_) {
    return
  }
  self.code = Some((self.vtable.cancel_read)(self.handle))
}

///|
pub impl[T] Waitable for StreamReader[T] with drop(self) -> Bool {
  _async_debug("stream-reader-drop(\{self.handle})")
  if self.dropped {
    return false
  }
  (self.vtable.drop_readable)(self.handle)
  self.dropped = true
  for ptr in self.memory_refs {
    (self.vtable.free)(ptr)
  }
  true
}

///|
pub impl[T] Waitable for StreamReader[T] with done(self) -> Bool {
  match self.code {
    Some(c) =>
      match WaitableStatus::decode(c) {
        Completed(_) | Dropped(_) | Cancelled(_) => true
        Blocking => false
      }
    None => false
  }
}

///|
pub fn[T] StreamReader::new(
  handle : Int,
  vtable : StreamVTable[T],
) -> StreamReader[T] {
  { handle, vtable, code: None, memory_refs: [], dropped: false}
}

///|
pub async fn[T] StreamReader::read(
  self : StreamReader[T],
  buffer : FixedArray[T],
  offset? : Int = 0,
  length : Int,
) -> Int raise {
  // register this waitable to the current task
  let task = current_task()
  task.add_waitable(self, current_coroutine())
  defer task.remove_waitable(self)

  let buf_ptr = (self.vtable.malloc)(length)
  self.code = Some((self.vtable.read)(self.handle, buf_ptr, length))

  _async_debug("stream-read(\{self.handle}) -> \{self.code.unwrap()}")
  for {
    let status = WaitableStatus::decode(self.code.unwrap())
    match status {
      Completed(n) => {
        let read_result = (self.vtable.lift)(buf_ptr, n)
        for i in 0..<n {
          buffer[offset + i] = read_result[i]
        }
        return n
      }
      Cancelled(n) | Dropped(n) => {
        let read_result = (self.vtable.lift)(buf_ptr, n)
        for i in 0..<n {
          buffer[offset + i] = read_result[i]
        }
        raise StreamCancelled::StreamCancelled((n, Cancelled::Cancelled))
      }
      Blocking => suspend()
    }
  }
}

///|
pub struct StreamWriter[T] {
  handle : Int
  vtable : StreamVTable[T]
  mut code : Int?

  mut dropped : Bool
  memory_refs : Array[Int]
}

///|
pub impl[T] Waitable for StreamWriter[T] with update(self, code~ : Int) -> Unit {
  self.code = Some(code)
}

pub impl[T] Eq for StreamWriter[T] with equal(self, other) -> Bool {
  self.handle == other.handle
}


///|
pub impl[T] Waitable for StreamWriter[T] with handle(self) -> Int {
  self.handle
}

///|
pub impl[T] Waitable for StreamWriter[T] with cancel(self) -> Unit {
  if self.code is Some(code) && WaitableStatus::decode(code) is Cancelled(_) {
    return
  }
  self.code = Some((self.vtable.cancel_write)(self.handle))
}

///|
pub impl[T] Waitable for StreamWriter[T] with drop(self) -> Bool {
  _async_debug("stream-writer-drop(\{self.handle})")
  let task = current_task()
  let coro = task.children.get(self.handle)
  if coro is Some((_, coro)) {
    coro.cancel()
    coro.wake()
  }
  if self.dropped {
    return false
  }
  (self.vtable.drop_writable)(self.handle)
  self.dropped = true
  for ptr in self.memory_refs {
    (self.vtable.free)(ptr)
  }
  true
}

///|
pub impl[T] Waitable for StreamWriter[T] with done(self) -> Bool {
  match self.code {
    Some(c) =>
      match WaitableStatus::decode(c) {
        Completed(_) | Dropped(_) | Cancelled(_) => true
        Blocking => false
      }
    None => false
  }
}

///|
pub fn[T] StreamWriter::new(
  handle : Int,
  vtable : StreamVTable[T],
) -> StreamWriter[T] {
  { handle, vtable, code: None, memory_refs: [], dropped: false}
}

///|
pub async fn[T] StreamWriter::write(
  self : StreamWriter[T],
  buffer : FixedArray[T],
) -> Int raise {
  // register this waitable to the current task
  let task = current_task()
  task.add_waitable(self, current_coroutine())
  defer task.remove_waitable(self)

  let write_buf = (self.vtable.lower)(buffer)
  self.code = Some((self.vtable.write)(self.handle, write_buf, buffer.length()))
  for {
    let status = WaitableStatus::decode(self.code.unwrap())
    match status {
      Completed(n) => return n
      Cancelled(n) | Dropped(n) =>
        raise StreamCancelled::StreamCancelled((n, Cancelled::Cancelled))
      Blocking => suspend()
    }
  }
}
